package org.codehaus.activemq;

import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;

public class ActiveMQConnectionConsumer
  implements ConnectionConsumer, ActiveMQMessageDispatcher
{
  private ActiveMQConnection connection;
  private ServerSessionPool sessionPool;
  private ConsumerInfo consumerInfo;
  private boolean closed;
  private int maximumMessages;
  protected MemoryBoundedQueue messageQueue;

  protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo, int theMaximumMessages)
    throws JMSException
  {
    this.connection = theConnection;
    this.sessionPool = theSessionPool;
    this.consumerInfo = theConsumerInfo;
    this.maximumMessages = theMaximumMessages;
    this.connection.addConnectionConsumer(this);
    this.consumerInfo.setStarted(true);
    this.connection.syncSendPacket(this.consumerInfo);

    String queueName = this.connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":" + theConsumerInfo.getConsumerNo();

    this.messageQueue = this.connection.getMemoryBoundedQueue(queueName);
  }

  public boolean isTarget(ActiveMQMessage message)
  {
    return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
  }

  public void dispatch(ActiveMQMessage message)
  {
    if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
      message.setConsumerId(this.consumerInfo.getConsumerId());
      try {
        if (this.sessionPool != null)
          dispatchToSession(message);
        else
          dispatchToQueue(message);
      }
      catch (JMSException jmsEx) {
        this.connection.handleAsyncException(jmsEx);
      }
    }
  }

  private void dispatchToQueue(ActiveMQMessage message)
    throws JMSException
  {
    this.messageQueue.enqueue(message);
  }

  public ActiveMQMessage receive(long timeout)
    throws JMSException
  {
    try
    {
      ActiveMQMessage message = (ActiveMQMessage)this.messageQueue.dequeue(timeout);
      return message;
    } catch (InterruptedException ioe) {
    }
    return null;
  }

  private void dispatchToSession(ActiveMQMessage message)
    throws JMSException
  {
    ServerSession serverSession = this.sessionPool.getServerSession();
    ActiveMQSession session = (ActiveMQSession)serverSession.getSession();

    session.dispatch(message);
    serverSession.start();
  }

  public ServerSessionPool getServerSessionPool()
    throws JMSException
  {
    if (this.closed) {
      throw new IllegalStateException("The Connection Consumer is closed");
    }
    return this.sessionPool;
  }

  public void close()
    throws JMSException
  {
    if (!this.closed) {
      this.closed = true;
      this.consumerInfo.setStarted(false);
      this.connection.asyncSendPacket(this.consumerInfo);
      this.connection.removeConnectionConsumer(this);
    }
  }
}